{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 55,
   "metadata": {},
   "outputs": [],
   "source": [
    "import urllib\n",
    "import json\n",
    "import requests\n",
    "import dateutil.parser\n",
    "import datetime\n",
    "from datetime import datetime\n",
    "from TdAmeritradeStream import TDAuthentication\n",
    "from config import password, account_number, client_id\n",
    "\n",
    "\n",
    "# Alternative method for storing login info, if you choose not to use a config file.\n",
    "# --------------------------------------\n",
    "# password = 'YOUR PASSWORD'\n",
    "# account_number = 'YOUR ACCOUNT NUMBER'\n",
    "# client_id = 'YOUR CLIENT ID'\n",
    "# --------------------------------------"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 56,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "from splinter import Browser\n",
    "    \n",
    "def get_access_token():\n",
    "    \n",
    "    # define the location of the Chrome Driver - YOU MUST CHANGE THE PATH SO IT POINTS TO YOUR CHROMEDRIVER\n",
    "    executable_path = {'executable_path': r'PATH_TO_CHROMEDRIVER'}\n",
    "\n",
    "    # Create a new instance of the browser, make sure we can see it (Headless = False)\n",
    "    browser = Browser('chrome', **executable_path, headless=True)\n",
    "\n",
    "    # define the components to build a URL\n",
    "    method = 'GET'\n",
    "    url = 'https://auth.tdameritrade.com/auth?'\n",
    "    client_code = client_id + '@AMER.OAUTHAP'\n",
    "    payload = {'response_type':'code', 'redirect_uri':'http://localhost/test', 'client_id':client_code}\n",
    "\n",
    "    # build the URL and store it in a new variable\n",
    "    p = requests.Request(method, url, params=payload).prepare()\n",
    "    myurl = p.url\n",
    "\n",
    "    # go to the URL\n",
    "    browser.visit(myurl)\n",
    "\n",
    "    # define items to fillout form\n",
    "    payload = {'username': account_number,\n",
    "               'password': password}\n",
    "\n",
    "    # fill out each part of the form and click submit\n",
    "    username = browser.find_by_id(\"username\").first.fill(payload['username'])\n",
    "    password = browser.find_by_id(\"password\").first.fill(payload['password'])\n",
    "    submit   = browser.find_by_id(\"accept\").first.click()\n",
    "\n",
    "    # click the Accept terms button\n",
    "    browser.find_by_id(\"accept\").first.click() \n",
    "\n",
    "    # give it a second, then grab the url\n",
    "    time.sleep(1)\n",
    "    new_url = browser.url\n",
    "    parse_url = urllib.parse.unquote(new_url.split('code=')[1])\n",
    "\n",
    "    # close the browser\n",
    "    browser.quit()\n",
    "    \n",
    "    print(\"Pulled Code, grabbing access token.\")\n",
    "    \n",
    "    # THE AUTHENTICATION ENDPOINT\n",
    "\n",
    "    # define the endpoint\n",
    "    url = r\"https://api.tdameritrade.com/v1/oauth2/token\"\n",
    "\n",
    "    # define the headers\n",
    "    headers = {\"Content-Type\":\"application/x-www-form-urlencoded\"}\n",
    "\n",
    "    # define the payload\n",
    "    payload = {'grant_type': 'authorization_code', \n",
    "               'access_type': 'offline', \n",
    "               'code': parse_url, \n",
    "               'client_id':client_id, \n",
    "               'redirect_uri':'http://localhost/test'}\n",
    "\n",
    "    # post the data to get the token\n",
    "    authReply = requests.post(r'https://api.tdameritrade.com/v1/oauth2/token', headers = headers, data=payload)\n",
    "\n",
    "    # convert it to a dictionary\n",
    "    decoded_content = authReply.json()    \n",
    "    \n",
    "    # grab the access_token\n",
    "    access_token = decoded_content['access_token']\n",
    "    headers = {'Authorization': \"Bearer {}\".format(access_token)}\n",
    "    \n",
    "    return headers"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 54,
   "metadata": {},
   "outputs": [],
   "source": [
    "def unix_time_millis(dt):\n",
    "    \n",
    "    # grab the starting point, so time '0'\n",
    "    epoch = datetime.datetime.utcfromtimestamp(0)\n",
    "    \n",
    "    return (dt - epoch).total_seconds() * 1000.0"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# we need to go to the User Principals endpoint to get the info we need to make a streaming request\n",
    "endpoint = \"https://api.tdameritrade.com/v1/userprincipals\"\n",
    "\n",
    "# get our access token\n",
    "headers = get_access_token()\n",
    "\n",
    "# this endpoint, requires fields which are separated by ','\n",
    "params = {'fields':'streamerSubscriptionKeys,streamerConnectionInfo'}\n",
    "\n",
    "# make a request\n",
    "content = requests.get(url = endpoint, params = params, headers = headers)\n",
    "userPrincipalsResponse = content.json()\n",
    "\n",
    "# we need to get the timestamp in order to make our next request, but it needs to be parsed\n",
    "tokenTimeStamp = userPrincipalsResponse['streamerInfo']['tokenTimestamp']\n",
    "date = dateutil.parser.parse(tokenTimeStamp, ignoretz = True)\n",
    "tokenTimeStampAsMs = unix_time_millis(date)\n",
    "\n",
    "# we need to define our credentials that we will need to make our stream\n",
    "credentials = {\"userid\": userPrincipalsResponse['accounts'][0]['accountId'],\n",
    "               \"token\": userPrincipalsResponse['streamerInfo']['token'],\n",
    "               \"company\": userPrincipalsResponse['accounts'][0]['company'],\n",
    "               \"segment\": userPrincipalsResponse['accounts'][0]['segment'],\n",
    "               \"cddomain\": userPrincipalsResponse['accounts'][0]['accountCdDomainId'],\n",
    "               \"usergroup\": userPrincipalsResponse['streamerInfo']['userGroup'],\n",
    "               \"accesslevel\":userPrincipalsResponse['streamerInfo']['accessLevel'],\n",
    "               \"authorized\": \"Y\",\n",
    "               \"timestamp\": int(tokenTimeStampAsMs),\n",
    "               \"appid\": userPrincipalsResponse['streamerInfo']['appId'],\n",
    "               \"acl\": userPrincipalsResponse['streamerInfo']['acl'] }\n",
    "\n",
    "userPrincipalsResponse"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# define a request\n",
    "login_request = {\"requests\": [{\"service\": \"ADMIN\",\n",
    "                              \"requestid\": \"0\",  \n",
    "                              \"command\": \"LOGIN\",\n",
    "                              \"account\": userPrincipalsResponse['accounts'][0]['accountId'],\n",
    "                              \"source\": userPrincipalsResponse['streamerInfo']['appId'],\n",
    "                              \"parameters\": {\"credential\": urllib.parse.urlencode(credentials),\n",
    "                                             \"token\": userPrincipalsResponse['streamerInfo']['token'],\n",
    "                                             \"version\": \"1.0\"}}]}\n",
    "\n",
    "\n",
    "# define a request for different data sources\n",
    "data_request= {\"requests\": [{\"service\": \"ACTIVES_NASDAQ\", \n",
    "                             \"requestid\": \"1\", \n",
    "                             \"command\": \"SUBS\", \n",
    "                             \"account\": userPrincipalsResponse['accounts'][0]['accountId'], \n",
    "                             \"source\": userPrincipalsResponse['streamerInfo']['appId'], \n",
    "                             \"parameters\": {\"keys\": \"NASDAQ-60\", \n",
    "                                            \"fields\": \"0,1\"}},\n",
    "                            {\"service\": \"LEVELONE_FUTURES\",\n",
    "                             \"requestid\": \"2\",\n",
    "                             \"command\": \"SUBS\",\n",
    "                             \"account\": userPrincipalsResponse['accounts'][0]['accountId'],\n",
    "                             \"source\": userPrincipalsResponse['streamerInfo']['appId'],\n",
    "                             \"parameters\": {\"keys\": \"/ES\",\n",
    "                                            \"fields\": \"0,1,2,3,4\"}}]}\n",
    "\n",
    "\n",
    "# create it into a JSON string, as the API expects a JSON string.\n",
    "login_encoded = json.dumps(login_request)\n",
    "data_encoded = json.dumps(data_request)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import websockets\n",
    "import asyncio\n",
    "import pyodbc\n",
    "\n",
    "class WebSocketClient():\n",
    "\n",
    "    def __init__(self):\n",
    "        self.data_holder = []\n",
    "        self.file = open('td_ameritrade_data.txt', 'a')\n",
    "        self.cnxn = None\n",
    "        self.crsr = None\n",
    "        \n",
    "    def database_connect(self):\n",
    "        \n",
    "        # define the server and the database, YOU WILL NEED TO CHANGE THIS TO YOUR OWN DATABASE AND SERVER\n",
    "        server = 'YOUR_SERVER' \n",
    "        database = 'YOUR_DATABASE'  \n",
    "        sql_driver = '{ODBC Driver 17 for SQL Server}'\n",
    "\n",
    "        # define our connection, autocommit MUST BE SET TO TRUE, also we can edit data.\n",
    "        self.cnxn = pyodbc.connect(driver = sql_driver, \n",
    "                                   server = server, \n",
    "                                   database = database, \n",
    "                                   trusted_connection ='yes')\n",
    "\n",
    "        self.crsr = self.cnxn.cursor()\n",
    "        \n",
    "    def database_insert(self, query, data_tuple):   \n",
    "        \n",
    "        # execute the query, commit the changes, and close the connection\n",
    "        self.crsr.execute(query, data_tuple)\n",
    "        self.cnxn.commit()\n",
    "        self.cnxn.close()\n",
    "        \n",
    "        print('Data has been successfully inserted into the database.')\n",
    "\n",
    "    async def connect(self):\n",
    "        '''\n",
    "            Connecting to webSocket server\n",
    "            websockets.client.connect returns a WebSocketClientProtocol, which is used to send and receive messages\n",
    "        '''\n",
    "        \n",
    "        # define the URI of the data stream, and connect to it.\n",
    "        uri = \"wss://\" + userPrincipalsResponse['streamerInfo']['streamerSocketUrl'] + \"/ws\"\n",
    "        self.connection = await websockets.client.connect(uri)\n",
    "        \n",
    "        # if all goes well, let the user know.\n",
    "        if self.connection.open:\n",
    "            print('Connection established. Client correctly connected')\n",
    "            return self.connection\n",
    "\n",
    "\n",
    "    async def sendMessage(self, message):\n",
    "        '''\n",
    "            Sending message to webSocket server\n",
    "        '''\n",
    "        await self.connection.send(message)\n",
    "        \n",
    "\n",
    "    async def receiveMessage(self, connection):\n",
    "        '''\n",
    "            Receiving all server messages and handle them\n",
    "        '''\n",
    "        while True:\n",
    "            try:\n",
    "                \n",
    "                # grab and decode the message\n",
    "                message = await connection.recv()                \n",
    "                message_decoded = json.loads(message)\n",
    "                \n",
    "                # prepare data for insertion, connect to database\n",
    "                query = \"INSERT INTO td_service_data (service, timestamp, command) VALUES (?,?,?);\"\n",
    "                self.database_connect()\n",
    "                \n",
    "                # check if the response contains a key called data if so then it contains the info we want to insert.\n",
    "                if 'data' in message_decoded.keys():\n",
    "                    \n",
    "                    # grab the data\n",
    "                    data = message_decoded['data'][0]\n",
    "                    data_tuple = (data['service'], str(data['timestamp']), data['command'])\n",
    "                    \n",
    "                    # insert the data\n",
    "                    self.database_insert(query, data_tuple)\n",
    "                    \n",
    "                print('-'*20)\n",
    "                print('Received message from server: ' + str(message))\n",
    "                \n",
    "            except websockets.exceptions.ConnectionClosed:            \n",
    "                print('Connection with server closed')\n",
    "                break\n",
    "\n",
    "                \n",
    "    async def heartbeat(self, connection):\n",
    "        '''\n",
    "            Sending heartbeat to server every 5 seconds\n",
    "            Ping - pong messages to verify connection is alive\n",
    "        '''\n",
    "        while True:\n",
    "            try:\n",
    "                await connection.send('ping')\n",
    "                await asyncio.sleep(5)\n",
    "            except websockets.exceptions.ConnectionClosed:\n",
    "                print('Connection with server closed')\n",
    "                break\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import nest_asyncio\n",
    "nest_asyncio.apply()\n",
    "\n",
    "\n",
    "if __name__ == '__main__':\n",
    "    \n",
    "    # Creating client object\n",
    "    client = WebSocketClient()\n",
    "    \n",
    "    loop = asyncio.get_event_loop()\n",
    "    \n",
    "    # Start connection and get client connection protocol\n",
    "    connection = loop.run_until_complete(client.connect())\n",
    "    \n",
    "    # Start listener and heartbeat \n",
    "    tasks = [asyncio.ensure_future(client.receiveMessage(connection)),\n",
    "             asyncio.ensure_future(client.sendMessage(login_encoded)),\n",
    "             asyncio.ensure_future(client.receiveMessage(connection)),\n",
    "             asyncio.ensure_future(client.sendMessage(data_encoded)),\n",
    "             asyncio.ensure_future(client.receiveMessage(connection))]\n",
    "\n",
    "    loop.run_until_complete(asyncio.wait(tasks))\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
